package cn.cup.dmp.report.MediaAnalysis

import java.util.Properties

import cn.cup.dmp.config.ConfigHelper
import cn.cup.dmp.utils.{MyJedisPool, RptKpiHandler}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SQLContext}

object MediaAnalysisWithRedis {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("媒体报表分析").setMaster("local[*]")
    conf.set("spark.serializer", ConfigHelper.ser)
    val sc = new SparkContext(conf)
    val sQLContext = new SQLContext(sc)

    //读取文件
    val file: DataFrame = sQLContext.read.parquet(ConfigHelper.destPath)
    //得到中间结果
    val res: RDD[(String, List[Double])] = file.mapPartitions(iter =>{
      val jedis = MyJedisPool.getRedis()
      val appNameResult = iter.map(row => {
        var appName = row.getAs[String]("appname")
        val appId = row.getAs[String]("appid")
        if (StringUtils.isEmpty(appName)) {
          if (StringUtils.isEmpty(appId)) {
            appName = "未知"
          } else {
            //根据key 和field 从redis中获取AppName
            if (jedis.hexists("appDictory", appId)) {
              appName = jedis.hget("appDictory", appId)
            } else {
              appName = "未知"
            }
          }
        } else {
          appName
        }
        (appName,RptKpiHandler.rptKpi(row))

      })
      jedis.close()
      appNameResult
    })

    //分组聚合
    val value: RDD[(String, List[Double])] = res.reduceByKey((lst1, list2)=>lst1.zip(list2).map(t => t._1 + t._2))

    //转化为DataFrame
    import sQLContext.implicits._
    val frame = value.map(t => (t._1, t._2(0).toInt, t._2(1).toInt, t._2(2).toInt, t._2(3).toInt, t._2(4).toInt, t._2(5), t._2(6), t._2(7).toInt, t._2(8).toInt))
      .toDF("appName", "original_request_number", "valid_requests", "meet_condition_request",
        "partitionbidsNum", "success_bid_Num", "everytime_consump", "everytime_cost", "show_times", "click_times")

    // 数据存入Mysql数据库
    val connetProperties = new Properties()
    connetProperties.setProperty("user", ConfigHelper.username)
    connetProperties.setProperty("password", ConfigHelper.password)
    frame.write.jdbc(ConfigHelper.url, ConfigHelper.app_Analysis_table, connetProperties)


    sc.stop()
  }

}
